Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement index for buffer #24954

Merged
merged 3 commits into from
May 6, 2024
Merged

feat: Implement index for buffer #24954

merged 3 commits into from
May 6, 2024

Conversation

pauldix
Copy link
Member

@pauldix pauldix commented May 2, 2024

This implements an index for the data in the table buffers. For now, by default, it indexes all tags, keeping a mapping of tag key/value pair to the row ids that it has in the buffer. When queries ask for record batches from the table buffer, the filter expression is evaluated to determine if a record batch can be built on the fly using only the row ids that match the index. If we don't have it in the index, the entire record batch from the buffer will be returned.

This also updates the logic in segment state to only request a record batch with the projection. The query executor was updated so that it pushes the filter and projection down to the request to get table chunks.

While implementing this, I believe I uncovered a bug where when limits are hit, a write still attempts to get buffered. I'll log a follow up to look at that.

@@ -319,6 +319,19 @@ impl TableDefinition {
self.schema = schema;
}

pub(crate) fn index_columns(&self) -> Vec<String> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that we'll want to continue to default to having all tag columns be indexed. We'll need a new API & CLI to set the columns explicitly that a user wants indexed. We may find that by default we don't want to index anything.

@@ -303,10 +329,22 @@ impl DatabaseBuffer {
segment_key: &PartitionKey,
table_batch: TableBatch,
) {
if !self.table_buffers.contains_key(&table_name) {
// TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #24955 to follow up and check what's going on here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this happen without your changes at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without my changes, the limit test passes, but that's because the buffer doesn't look for the table in the db_schema. So it looks to me like the limit check is hit, but it still calls to buffer the write, which passes on the old code path because it doesn't look into the catalog. That would be a bug because the buffered write would make it into the WAL and into the in-memory buffer without a corresponding catalog update. At least, that's what I think is going on here.

This implements an index for the data in the table buffers. For now, by default, it indexes all tags, keeping a mapping of tag key/value pair to the row ids that it has in the buffer. When queries ask for record batches from the table buffer, the filter expression is evaluated to determine if a record batch can be built on the fly using only the row ids that match the index. If we don't have it in the index, the entire record batch from the buffer will be returned.

This also updates the logic in segment state to only request a record batch with the projection. The query executor was updated so that it pushes the filter and projection down to the request to get table chunks.

While implementing this, I believe I uncovered a bug where when limits are hit, a write still attempts to get buffered. I'll log a follow up to look at that.
Copy link
Contributor

@mgattozzi mgattozzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have quite a few suggestions to make the code easier to read and understand. I'll need to probably do another pass to make sure I'm fully understanding all of these changes.

Comment on lines 476 to 479
filters
.iter()
.map(|_f| Ok(TableProviderFilterPushDown::Inexact))
.collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not using the exprs here at all nor does it need to be a Result since this will always be Ok. I think you can just do this:

Suggested change
filters
.iter()
.map(|_f| Ok(TableProviderFilterPushDown::Inexact))
.collect()
vec![TableProviderFilterPushDown::Inexact; filters.len()]

Like here:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=1d812cf42a2fb158006d0ab96ae346f3

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll have to be a result because that's the trait signature, but good call on the vec.

Comment on lines +204 to +214
if !buffered_data.database_buffers.contains_key(db_name) {
let db_schema = catalog.db_schema(db_name).expect("db schema should exist");
buffered_data.database_buffers.insert(
db_name.clone(),
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema,
},
);
}
let db_buffer = buffered_data.database_buffers.get_mut(db_name).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can keep the initial form with using the entry API like so sans borrow check issues:

Suggested change
if !buffered_data.database_buffers.contains_key(db_name) {
let db_schema = catalog.db_schema(db_name).expect("db schema should exist");
buffered_data.database_buffers.insert(
db_name.clone(),
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema,
},
);
}
let db_buffer = buffered_data.database_buffers.get_mut(db_name).unwrap();
let db_buffer = buffered_data
.database_buffers
.entry(db_name)
.or_insert_with(|| {
DatabaseBuffer {
table_buffers: HashMap::new(),
db_schema: catalog
.db_schema(db_name)
.expect("db schema should exist");
}
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the PR it seems you had done something similar earlier. Was there a reason you didn't do so here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that doesn't work because db_name is a borrowed string, which doesn't work with the entry API. So I did it this way to avoid a string clone, rather than use the entry API.

@@ -303,10 +329,22 @@ impl DatabaseBuffer {
segment_key: &PartitionKey,
table_batch: TableBatch,
) {
if !self.table_buffers.contains_key(&table_name) {
// TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this happen without your changes at all?

@@ -77,6 +77,9 @@ pub enum Error {

#[error("walop in file {0} contained data for more than one segment, which is invalid")]
WalOpForMultipleSegments(String),

#[error("error: {0}")]
Other(#[from] anyhow::Error),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for us using just a catch all here rather than specific variants?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change them into specific variants, I wasn't sure what to do here. These errors are ones that actually shouldn't happen, so the receiver can't do anything about it. I was just tossing it into a catch all so that it would at least get logged and get some sort of message back to the user. Previously they were doing a panic, which I wanted to remove. I can add specific thiserror variants if you think this is gross 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to have a specific error, incoming...

Comment on lines +71 to +82
if !self.data.contains_key(&f.name) {
let mut tag_builder = StringDictionaryBuilder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
tag_builder.append_null();
}
Builder::Tag(tag_builder)
});
self.data.insert(f.name.clone(), Builder::Tag(tag_builder));
}
let b = self
.data
.get_mut(&f.name)
.expect("tag builder should exist");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could simplified to this I believe:

let b = self.data.entry(&f.name)
    .or_insert_with(|| {
        let mut tag_builder = StringDictionaryBuilder::new();
        // append nulls for all previous rows
        for _ in 0..(row_index + self.row_count) {
            tag_builder.append_null();
        }
        self.data.insert(f.name.clone(), Builder::Tag(tag_builder));
    });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entry API won't take a borrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah dang. That's unfortunate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One day we might want to consider this option, but it feels very arm twisting to get the compiler to accept this:

https://idubrov.name/rust/2018/06/01/tricking-the-hashmap.html

&self,
schema: SchemaRef,
filter: &[Expr],
) -> Result<Vec<RecordBatch>, anyhow::Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only ever return one RecordBatch. Why return a Vec here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory it was going to potentially return many as other things were added to table buffer like splitting up the writes based on time. I can change it to a single and then add it back in later if that's how things progress?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes more sense. Better to do what we have now and change the type sig later if need be.

Comment on lines +270 to +281
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
if *op == datafusion::logical_expr::Operator::Eq {
if let Expr::Column(c) = left.as_ref() {
if let Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v))) =
right.as_ref()
{
return self.columns.get(c.name.as_str()).and_then(|m| m.get(v));
}
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can simplify the code like so, but maybe not.

Suggested change
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
if *op == datafusion::logical_expr::Operator::Eq {
if let Expr::Column(c) = left.as_ref() {
if let Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v))) =
right.as_ref()
{
return self.columns.get(c.name.as_str()).and_then(|m| m.get(v));
}
}
}
}
}
if let Expr::BinaryExpr(BinaryExpr {
left: Expr::Column(c),
op: datafusion::logical_expr::Operator::Eq,
right: Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v)))
}) = expr {
return self.columns.get(c.name.as_str()).and_then(|m| m.get(v));
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like I can because of the Box

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah dang Box patterns would have solved this problem and they've been unstable since 1.0 basically 😭

@pauldix pauldix requested a review from mgattozzi May 3, 2024 16:09
@pauldix pauldix merged commit 8e79667 into main May 6, 2024
8 checks passed
@pauldix pauldix deleted the pd/buffer-indexed branch May 6, 2024 16:59
mgattozzi added a commit that referenced this pull request May 14, 2024
Alternate Title: The DB Schema only ever has one table

This is a story of subtle bugs, gnashing of teeth, and hair pulling.
Gather round as I tell you the tale of of an Arc that pointed to an
outdated schema.

In #24954 we introduced an Index for the database as this will allow us
to perform faster queries. When we added that code this check was added:

```rust
if !self.table_buffers.contains_key(&table_name) {
    // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
    // and we've gotten here, it means we're dropping a write.
    if let Some(table) = self.db_schema.get_table(&table_name) {
        self.table_buffers.insert(
            table_name.clone(),
            TableBuffer::new(segment_key.clone(), &table.index_columns()),
        );
    } else {
        return;
    }
}
```

Adding the return there let us continue on with our day and make the
tests pass. However, just because these tests passed didn't mean the
code was correct as I would soon find out. With a follow up ticket of
#24955 created we merged the changes and I began to debug the issue.

Note we had the assumption of dropping a single write due to limits
because the limits test is what failed. What began was a chase of a few
days to prove that the limits weren't what was failing. This was a bit
long but the conclusion was that the limits weren't causing it, but it
did expose the fact that a Database only ever had one table which was
weird.

I then began to dig into this a bit more. Why would there only be one
table? We weren't just dropping one write, we were dropping all but
*one* write or so it seemed. Many printlns/hours later it became clear
that we were actually updating the schema! It existed in the Catalog,
but not in the pointer to the schema in the DatabaseBuffer struct so
what gives?

Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541).

In the `validate_or_insert_schema_and_partitions` function for the
WriteBuffer we have this bit of code:

```rust
// The (potentially updated) DatabaseSchema to return to the caller.
let mut schema = Cow::Borrowed(schema);
```

As we pass in a reference to the schema in the catalog. However, when we
[go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568)
we see this code:

```rust
    let schema = match schema {
        Cow::Owned(s) => Some(s),
        Cow::Borrowed(_) => None,
    };
```

What this means is that if we make a change we clone the original and
update it. We *aren't* making a change to the original schema. When we
go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460):

```rust
    if let Some(schema) = result.schema.take() {
        debug!("replacing schema for {:?}", schema);


        catalog.replace_database(sequence, Arc::new(schema))?;
    }
```

We are updating the catalog with the new schema, but how does that work?

```rust
        inner.databases.insert(db.name.clone(), db);
```

Oh. Oh no. We're just overwriting it. Which means that the
DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which
means that the buffer will get the first copy of the schema with the
first new table, but *none* of the other ones. The solution is to make
sure that the buffer has a pointer to the Catalog instead which means
the DatabaseBuffer will have the most up to date schema and instead lets
only the Catalog handle the schema itself. This commit makes those
changes to make sure it works.

This was a very very subtle mutability/pointer bug given the
intersection of valid borrow checking and some writes making it in, but
luckily we caught it. It does mean though that until this fix is in, we
can consider changes between the Index PR and now are subtly broken and
shouldn't be used for anything beyond writing to a signle table per DB.

TL;DR We should ask the Catalog what the schema is as it contains the up
to date version of it.

Closes #24955
mgattozzi added a commit that referenced this pull request May 16, 2024
Alternate Title: The DB Schema only ever has one table

This is a story of subtle bugs, gnashing of teeth, and hair pulling.
Gather round as I tell you the tale of of an Arc that pointed to an
outdated schema.

In #24954 we introduced an Index for the database as this will allow us
to perform faster queries. When we added that code this check was added:

```rust
if !self.table_buffers.contains_key(&table_name) {
    // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
    // and we've gotten here, it means we're dropping a write.
    if let Some(table) = self.db_schema.get_table(&table_name) {
        self.table_buffers.insert(
            table_name.clone(),
            TableBuffer::new(segment_key.clone(), &table.index_columns()),
        );
    } else {
        return;
    }
}
```

Adding the return there let us continue on with our day and make the
tests pass. However, just because these tests passed didn't mean the
code was correct as I would soon find out. With a follow up ticket of
#24955 created we merged the changes and I began to debug the issue.

Note we had the assumption of dropping a single write due to limits
because the limits test is what failed. What began was a chase of a few
days to prove that the limits weren't what was failing. This was a bit
long but the conclusion was that the limits weren't causing it, but it
did expose the fact that a Database only ever had one table which was
weird.

I then began to dig into this a bit more. Why would there only be one
table? We weren't just dropping one write, we were dropping all but
*one* write or so it seemed. Many printlns/hours later it became clear
that we were actually updating the schema! It existed in the Catalog,
but not in the pointer to the schema in the DatabaseBuffer struct so
what gives?

Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541).

In the `validate_or_insert_schema_and_partitions` function for the
WriteBuffer we have this bit of code:

```rust
// The (potentially updated) DatabaseSchema to return to the caller.
let mut schema = Cow::Borrowed(schema);
```

As we pass in a reference to the schema in the catalog. However, when we
[go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568)
we see this code:

```rust
    let schema = match schema {
        Cow::Owned(s) => Some(s),
        Cow::Borrowed(_) => None,
    };
```

What this means is that if we make a change we clone the original and
update it. We *aren't* making a change to the original schema. When we
go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460):

```rust
    if let Some(schema) = result.schema.take() {
        debug!("replacing schema for {:?}", schema);


        catalog.replace_database(sequence, Arc::new(schema))?;
    }
```

We are updating the catalog with the new schema, but how does that work?

```rust
        inner.databases.insert(db.name.clone(), db);
```

Oh. Oh no. We're just overwriting it. Which means that the
DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which
means that the buffer will get the first copy of the schema with the
first new table, but *none* of the other ones. The solution is to make
sure that the buffer has a pointer to the Catalog instead which means
the DatabaseBuffer will have the most up to date schema and instead lets
only the Catalog handle the schema itself. This commit makes those
changes to make sure it works.

This was a very very subtle mutability/pointer bug given the
intersection of valid borrow checking and some writes making it in, but
luckily we caught it. It does mean though that until this fix is in, we
can consider changes between the Index PR and now are subtly broken and
shouldn't be used for anything beyond writing to a signle table per DB.

TL;DR We should ask the Catalog what the schema is as it contains the up
to date version of it.

Closes #24955
mgattozzi added a commit that referenced this pull request May 16, 2024
Alternate Title: The DB Schema only ever has one table

This is a story of subtle bugs, gnashing of teeth, and hair pulling.
Gather round as I tell you the tale of of an Arc that pointed to an
outdated schema.

In #24954 we introduced an Index for the database as this will allow us
to perform faster queries. When we added that code this check was added:

```rust
if !self.table_buffers.contains_key(&table_name) {
    // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
    // and we've gotten here, it means we're dropping a write.
    if let Some(table) = self.db_schema.get_table(&table_name) {
        self.table_buffers.insert(
            table_name.clone(),
            TableBuffer::new(segment_key.clone(), &table.index_columns()),
        );
    } else {
        return;
    }
}
```

Adding the return there let us continue on with our day and make the
tests pass. However, just because these tests passed didn't mean the
code was correct as I would soon find out. With a follow up ticket of
#24955 created we merged the changes and I began to debug the issue.

Note we had the assumption of dropping a single write due to limits
because the limits test is what failed. What began was a chase of a few
days to prove that the limits weren't what was failing. This was a bit
long but the conclusion was that the limits weren't causing it, but it
did expose the fact that a Database only ever had one table which was
weird.

I then began to dig into this a bit more. Why would there only be one
table? We weren't just dropping one write, we were dropping all but
*one* write or so it seemed. Many printlns/hours later it became clear
that we were actually updating the schema! It existed in the Catalog,
but not in the pointer to the schema in the DatabaseBuffer struct so
what gives?

Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541).

In the `validate_or_insert_schema_and_partitions` function for the
WriteBuffer we have this bit of code:

```rust
// The (potentially updated) DatabaseSchema to return to the caller.
let mut schema = Cow::Borrowed(schema);
```

As we pass in a reference to the schema in the catalog. However, when we
[go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568)
we see this code:

```rust
    let schema = match schema {
        Cow::Owned(s) => Some(s),
        Cow::Borrowed(_) => None,
    };
```

What this means is that if we make a change we clone the original and
update it. We *aren't* making a change to the original schema. When we
go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460):

```rust
    if let Some(schema) = result.schema.take() {
        debug!("replacing schema for {:?}", schema);


        catalog.replace_database(sequence, Arc::new(schema))?;
    }
```

We are updating the catalog with the new schema, but how does that work?

```rust
        inner.databases.insert(db.name.clone(), db);
```

Oh. Oh no. We're just overwriting it. Which means that the
DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which
means that the buffer will get the first copy of the schema with the
first new table, but *none* of the other ones. The solution is to make
sure that the buffer has a pointer to the Catalog instead which means
the DatabaseBuffer will have the most up to date schema and instead lets
only the Catalog handle the schema itself. This commit makes those
changes to make sure it works.

This was a very very subtle mutability/pointer bug given the
intersection of valid borrow checking and some writes making it in, but
luckily we caught it. It does mean though that until this fix is in, we
can consider changes between the Index PR and now are subtly broken and
shouldn't be used for anything beyond writing to a signle table per DB.

TL;DR We should ask the Catalog what the schema is as it contains the up
to date version of it.

Closes #24955
mgattozzi added a commit that referenced this pull request May 16, 2024
Alternate Title: The DB Schema only ever has one table

This is a story of subtle bugs, gnashing of teeth, and hair pulling.
Gather round as I tell you the tale of of an Arc that pointed to an
outdated schema.

In #24954 we introduced an Index for the database as this will allow us
to perform faster queries. When we added that code this check was added:

```rust
if !self.table_buffers.contains_key(&table_name) {
    // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog
    // and we've gotten here, it means we're dropping a write.
    if let Some(table) = self.db_schema.get_table(&table_name) {
        self.table_buffers.insert(
            table_name.clone(),
            TableBuffer::new(segment_key.clone(), &table.index_columns()),
        );
    } else {
        return;
    }
}
```

Adding the return there let us continue on with our day and make the
tests pass. However, just because these tests passed didn't mean the
code was correct as I would soon find out. With a follow up ticket of
#24955 created we merged the changes and I began to debug the issue.

Note we had the assumption of dropping a single write due to limits
because the limits test is what failed. What began was a chase of a few
days to prove that the limits weren't what was failing. This was a bit
long but the conclusion was that the limits weren't causing it, but it
did expose the fact that a Database only ever had one table which was
weird.

I then began to dig into this a bit more. Why would there only be one
table? We weren't just dropping one write, we were dropping all but
*one* write or so it seemed. Many printlns/hours later it became clear
that we were actually updating the schema! It existed in the Catalog,
but not in the pointer to the schema in the DatabaseBuffer struct so
what gives?

Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541).

In the `validate_or_insert_schema_and_partitions` function for the
WriteBuffer we have this bit of code:

```rust
// The (potentially updated) DatabaseSchema to return to the caller.
let mut schema = Cow::Borrowed(schema);
```

As we pass in a reference to the schema in the catalog. However, when we
[go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568)
we see this code:

```rust
    let schema = match schema {
        Cow::Owned(s) => Some(s),
        Cow::Borrowed(_) => None,
    };
```

What this means is that if we make a change we clone the original and
update it. We *aren't* making a change to the original schema. When we
go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460):

```rust
    if let Some(schema) = result.schema.take() {
        debug!("replacing schema for {:?}", schema);


        catalog.replace_database(sequence, Arc::new(schema))?;
    }
```

We are updating the catalog with the new schema, but how does that work?

```rust
        inner.databases.insert(db.name.clone(), db);
```

Oh. Oh no. We're just overwriting it. Which means that the
DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which
means that the buffer will get the first copy of the schema with the
first new table, but *none* of the other ones. The solution is to make
sure that the buffer is passed the current schema so that it can use the most
up to date version from the catalog. This commit makes those changes
to make sure it works.

This was a very very subtle mutability/pointer bug given the
intersection of valid borrow checking and some writes making it in, but
luckily we caught it. It does mean though that until this fix is in, we
can consider changes between the Index PR and now are subtly broken and
shouldn't be used for anything beyond writing to a signle table per DB.

TL;DR We should ask the Catalog what the schema is as it contains the up
to date version of it.

Closes #24955
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants